﻿using IOP.Decoder.MQTT;
using IOP.Models.Message.MQTT;
using IOP.Models.Message.MQTT.Package;
using IOP.Models.Queue;
using IOP.Models;
using System;
using System.Buffers;
using System.IO.Pipelines;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
using System.Timers;
using System.Collections.Generic;
using System.Collections.Concurrent;

namespace IOP.Pulsar.MQTT.Client
{
    /// <summary>
    /// MQTT客户端
    /// </summary>
    public class MQTTClient : IDisposable
    {
        /// <summary>
        /// 连接的嵌套字
        /// </summary>
        public Socket ConnectedSocket { get; private set; }
        /// <summary>
        /// 是否处于连接状态
        /// </summary>
        public bool IsConnected { get; private set; } = false;
        /// <summary>
        /// 是否以及释放连接
        /// </summary>
        public bool IsDispose { get; private set; } = false;
        /// <summary>
        /// 客户端Guid
        /// </summary>
        public readonly Guid ClientGuid = Guid.NewGuid();
        /// <summary>
        /// 客户端Id
        /// </summary>
        public string ClientId { get; private set; } = "";
        /// <summary>
        /// 生产线
        /// </summary>
        public MQTTClientProductLine ProductLine { get; set; } = new MQTTClientProductLine();
        /// <summary>
        /// 配置
        /// </summary>
        private MQTTOption Options { get; set; } = null;
        /// <summary>
        /// 包队列
        /// </summary>
        private LoopQueue<IMQTTPackage> Packages { get; set; } = new LoopQueue<IMQTTPackage>();
        /// <summary>
        /// 包缓存
        /// </summary>
        private readonly ConcurrentDictionary<ushort, CachePublishPackage> CachePackages = new ConcurrentDictionary<ushort, CachePublishPackage>();

        /// <summary>
        /// 计时器
        /// </summary>
        private readonly Timer Timer = new Timer();     
        private readonly byte[] Buffer = new byte[MINSIZE];
        private const int MINSIZE = 1500;

        private readonly Random Random = new Random();

        /// <summary>
        /// IO管线
        /// </summary>
        private Pipe Pipe { get; set; } = new Pipe();

        /// <summary>
        /// 连接
        /// </summary>
        public void Connect(Action<MQTTOption> option)
        {
            try
            {
                Options = new MQTTOption();
                option(Options);
                if (Options.ServerAddress == null) throw new ArgumentNullException(nameof(Options.ServerAddress));
                if (Options.Port == 0) throw new ArgumentNullException(nameof(Options.ServerAddress));
                Packages.OnQueueHasElements += Packages_OnQueueHasElements;
                IPEndPoint serverAddress = new IPEndPoint(Options.ServerAddress, Options.Port);
                ConnectedSocket = new Socket(SocketType.Stream, ProtocolType.Tcp);

                MQTTConnectOption connectOption = new MQTTConnectOption();
                connectOption.CleanSession = Options.CleanSession;
                if (!string.IsNullOrEmpty(Options.ClientIdentifier))
                {
                    connectOption.ClientIdentifier = Options.ClientIdentifier;
                    ClientId = Options.ClientIdentifier;
                }
                else ClientId = ClientGuid.ToString("N");
                connectOption.KeepAlive = Options.KeepAlive;
                connectOption.ProtocolLevel = Options.ProtocolLevel;
                if (Options.WillFlag)
                {
                    connectOption.WillFlag = Options.WillFlag;
                    connectOption.WillMessage = Options.WillMessage;
                    connectOption.WillQoS = Options.WillQoS;
                    connectOption.WillRetain = Options.WillRetain;
                    connectOption.WillTopic = Options.WillTopic;
                }
                if (!string.IsNullOrEmpty(Options.UserName))
                {
                    connectOption.UserNameFlag = true;
                    connectOption.UserName = Options.UserName;
                }
                if (Options.Password.Length > 0)
                {
                    connectOption.PasswordFlag = true;
                    connectOption.Password = Options.Password;
                }

                var keepAlive = Options.KeepAlive - 5;
                if (keepAlive <= 0) throw new MQTTClientException(this, "KeepAlive time settings is too short");
                Timer.Interval = keepAlive * 1000;
                Timer.Elapsed += Timer_Elapsed;
                Timer.Start();

                ConnectedSocket.Connect(serverAddress);
                ConnectPackage connect = new ConnectPackage(connectOption);
                ConnectedSocket.Send(connect.ToBytes());
                StartReceive(ConnectedSocket);
            }
            catch (Exception e)
            {
                IsConnected = false;
                ProductLine.ErrorHandle?.Invoke(this, e);
                Disconnect();
            }
        }

        /// <summary>
        /// 重连
        /// </summary>
        public void Reconnect()
        {
            if (Options == null) throw new ArgumentNullException(nameof(Options));
            if (!IsConnected) ConnectedSocket.Connect(Options.ServerAddress, Options.Port);
        }
        /// <summary>
        /// 重连
        /// </summary>
        /// <param name="option"></param>
        public void Reconnect(Action<MQTTOption> option)
        {
            if(IsConnected) ConnectedSocket.Disconnect(false);
            ConnectedSocket.Shutdown(SocketShutdown.Both);
            ConnectedSocket.Close();
            ConnectedSocket.Dispose();
            Connect(option);
        }

        /// <summary>
        /// 发送发布报文
        /// </summary>
        /// <param name="topicName"></param>
        /// <param name="body"></param>
        /// <param name="qoSType"></param>
        /// <param name="retain"></param>
        public void SendPublish(string topicName, byte[] body, QoSType qoSType = QoSType.QoS0, bool retain = false)
        {
            try
            {
                var packetId = CreatePacketIdentifier();
                PublishPackage publishPackage = new PublishPackage(topicName, packetId, body, qoSType, false, retain);
                if (publishPackage.QoS != QoSType.QoS0)
                {
                    publishPackage.PacketIdentifier = CreatePacketIdentifier(publishPackage.PacketIdentifier);
                    RepeatTask task = new RepeatTask(Options.RepeatCount, Options.RepeatInterval);
                    CachePublishPackage cache = new CachePublishPackage(publishPackage, task);
                    cache.TaskFinish += Cache_TaskFinish;
                    cache.RunTask(() => 
                    {
                        var newPublish = new PublishPackage(publishPackage.TopicName, packetId, publishPackage.Body, publishPackage.QoS, true);
                        SendPackage(newPublish);
                    });
                    CachePackages.TryAdd(publishPackage.PacketIdentifier, cache);
                }
                ConnectedSocket.Send(publishPackage.ToBytes());
            }
            catch (Exception e)
            {
                ProductLine.ErrorHandle?.Invoke(this, e);
                Disconnect();
            }
        }

        /// <summary>
        /// 发送订阅报文
        /// </summary>
        /// <param name="topicFilters"></param>
        public void SendSubscribe(TopicFilter[] topicFilters)
        {
            try
            {
                var packId = CreatePacketIdentifier();
                SubscribePackage subscribe = new SubscribePackage(packId, topicFilters);
                ConnectedSocket.Send(subscribe.ToBytes());
            }
            catch (Exception e)
            {
                ProductLine.ErrorHandle?.Invoke(this, e);
                Disconnect();
            }
        }
        /// <summary>
        /// 发送取消订阅报文
        /// </summary>
        /// <param name="topicFilters"></param>
        public void SendUnsubscribe(string[] topicFilters)
        {
            try
            {
                var packId = CreatePacketIdentifier();
                UnsubscribePackage unsubscribe = new UnsubscribePackage(packId, topicFilters);
                ConnectedSocket.Send(unsubscribe.ToBytes());
            }
            catch (Exception e)
            {
                ProductLine.ErrorHandle?.Invoke(this, e);
                Disconnect();
            }
        }
        /// <summary>
        /// 发送报文
        /// </summary>
        /// <param name="package"></param>
        private void SendPackage(IMQTTPackage package)
        {
            try
            {
                ConnectedSocket.Send(package.ToBytes());
            }
            catch (Exception e)
            {
                ProductLine.ErrorHandle?.Invoke(this, e);
                Disconnect();
            }
        }

        /// <summary>
        /// 断开连接
        /// </summary>
        public void Disconnect()
        {
            if (ConnectedSocket.Connected)
            {
                DisconnectPackage disconnect = new DisconnectPackage(0);
                ConnectedSocket.Send(disconnect.ToBytes());
                ConnectedSocket.Disconnect(true);
            }
            IsConnected = false;
            Timer.Stop();
        }

        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose()
        {
            IsDispose = true;
            Timer.Stop();
            Timer.Elapsed -= Timer_Elapsed;
            Timer.Close();
            Timer.Dispose();
            ConnectedSocket.Disconnect(false);
            ConnectedSocket.Shutdown(SocketShutdown.Both);
            ConnectedSocket.Close();
            ConnectedSocket.Dispose();
            Packages.OnQueueHasElements -= Packages_OnQueueHasElements;
        }

        /// <summary>
        /// 开始接受数据
        /// </summary>
        /// <param name="client"></param>
        private void StartReceive(Socket client)
        {
            //Task.Run(() => client.BeginReceive(Buffer, 0, Buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveResult), client));
            client.BeginReceive(Buffer, 0, Buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveResult), client);
            Task reading = ReadPipeAsync(client, Pipe.Reader);
        }

        private async void ReceiveResult(IAsyncResult ar)
        {
            var client = ar.AsyncState as Socket;
            try
            {
                if (client != null)
                {
                    var lenght = client.EndReceive(ar);
                    if (lenght == 0) return;
                    Memory<byte> memory = Pipe.Writer.GetMemory(MINSIZE);
                    Buffer.CopyTo(memory);
                    Pipe.Writer.Advance(lenght);
                    await Pipe.Writer.FlushAsync();
                    client.BeginReceive(Buffer, 0, Buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveResult), client);
                }
            }
            catch (Exception e)
            {
                if (client != null)
                {
                    if (client.Connected)
                    {
                        DisconnectPackage disconnect = new DisconnectPackage();
                        client.Send(disconnect.ToBytes());
                        client.Disconnect(true);
                    }
                }
                ProductLine.ErrorHandle?.Invoke(this, e);
            }
        }
        /// <summary>
        /// 读取管线
        /// </summary>
        /// <param name="socket"></param>
        /// <param name="reader"></param>
        /// <returns></returns>
        private async Task ReadPipeAsync(Socket socket, PipeReader reader)
        {
            try
            {
                while (true)
                {
                    ReadResult result = await reader.ReadAsync();
                    ReadOnlySequence<byte> buffer = result.Buffer;
                    while (!buffer.IsEmpty)
                    {
                        var package = MQTTDecoder.Decode(ref buffer);
                        if (package != null)
                        {
                            Packages.Enqueue(package);
                        }
                        else break;
                    }
                    reader.AdvanceTo(buffer.Start, buffer.End);
                    if (result.IsCompleted) break;
                }
            }
            catch (Exception e)
            {
                ProductLine.ErrorHandle?.Invoke(this, e);
                Disconnect();
            }
            finally
            {
                reader.Complete();
            }
        }

        /// <summary>
        /// 队列事件
        /// </summary>
        /// <param name="obj"></param>
        private void Packages_OnQueueHasElements(LoopQueue<IMQTTPackage> obj)
        {
            while (!obj.IsEmpty)
            {
                obj.Dequene(out IMQTTPackage package);
                Task.Run(() =>
                {
                    try
                    {
                        switch (package)
                        {
                            case ConnackPackage connack:
                                switch (connack.ConnectReturnCode)
                                {
                                    case ReturnCodeType.Accept:
                                        IsConnected = true;
                                        break;
                                    case ReturnCodeType.Authorize:
                                        throw new MQTTClientException(this, "Authorize Error");
                                    case ReturnCodeType.ClientIdError:
                                        throw new MQTTClientException(this, "ClientId Error");
                                    case ReturnCodeType.ProtocolError:
                                        throw new MQTTClientException(this, "Protocol Error");
                                    case ReturnCodeType.ServerError:
                                        throw new MQTTClientException(this, "Server Error");
                                    case ReturnCodeType.UserError:
                                        throw new MQTTClientException(this, "User Error");
                                }
                                ProductLine.ConnackHandle?.Invoke(this, connack);
                                break;
                            case PublishPackage publish:
                                ProductLine.PublishHandle?.Invoke(this, publish);
                                break;
                            case PubackPackage puback:
                                ProductLine.PubackHandle?.Invoke(this, puback);
                                if (CachePackages.ContainsKey(puback.PacketIdentifier))
                                {
                                    var task = CachePackages[puback.PacketIdentifier];
                                    task.RepeatTask.Interrupt();
                                }
                                break;
                            case PubrecPackage pubrec:
                                ProductLine.PubrecHandle?.Invoke(this, pubrec);
                                PubrelPackage newPubrel = new PubrelPackage(pubrec.PacketIdentifier);
                                SendPackage(newPubrel);
                                break;
                            case PubrelPackage pubrel:
                                ProductLine.PubrelHandle?.Invoke(this, pubrel);
                                break;
                            case PubcompPackage pubcomp:
                                ProductLine.PubcompHandle?.Invoke(this, pubcomp);
                                if (CachePackages.ContainsKey(pubcomp.PacketIdentifier))
                                {
                                    var task = CachePackages[pubcomp.PacketIdentifier];
                                    task.RepeatTask.Interrupt();
                                }
                                break;
                            case SubackPackage suback:
                                ProductLine.SubackHandle?.Invoke(this, suback);
                                break;
                            case UnsubackPackage unsuback:
                                ProductLine.UnsubackHandle?.Invoke(this, unsuback);
                                break;
                            case PingrespPackage pingresp:
                                ProductLine.PingrespHandle?.Invoke(this, pingresp);
                                break;
                            case DisconnectPackage disconnect:
                                Disconnect();
                                break;
                        }
                    }
                    catch (Exception e)
                    {
                        ProductLine.ErrorHandle?.Invoke(this, e);
                        Disconnect();
                    }
                });
            }
        }

        /// <summary>
        /// 计时器方法
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void Timer_Elapsed(object sender, ElapsedEventArgs e)
        {
            SendPackage(new PingreqPackage(0));
        }

        /// <summary>
        /// 创建一个新的报文标识符
        /// </summary>
        /// <param name="oldValue"></param>
        /// <returns></returns>
        private ushort CreatePacketIdentifier(ushort oldValue = 0)
        {
            if (CachePackages.ContainsKey(oldValue) || oldValue == 0)
            {
                oldValue = (ushort)Random.Next(1, ushort.MaxValue);
                CreatePacketIdentifier(oldValue);
            }
            return oldValue;
        }

        /// <summary>
        /// publish包缓存重复发包任务完成事件处理函数
        /// </summary>
        /// <param name="packetId"></param>
        private void Cache_TaskFinish(ushort packetId)
        {
            var task = CachePackages[packetId];
            task.Dispose();
            CachePackages.TryRemove(packetId, out task);
        }
    }
}
